package server

import (
	"fmt"
	"gitee.com/zhucheer/pub-connect/app/define"
	pubproto "gitee.com/zhucheer/pub-connect/app/proto"
	"io"
	"time"
)

// 处理收到的客户端消息
// 监听服务端发送到该channel数据sws.watcher.ch 将数据发送到客户端实例
func (sws *serverWatchStream) sendLoop() {
	for {
		select {
		// 向该通道注入消息会发动到客户端
		case v, ok := <-sws.watcher.ch:
			if !ok {
				return
			}
			err := sws.gRPCStream.Send(&pubproto.WorkDetail{
				PriKey:      v.PriKey,
				AppName:     v.AppName,
				AppUuid:     v.AppUuid,
				Uuid:        v.Uuid,
				Name:        v.Name,
				Params:      v.Params,
				RequestTime: v.RequestTime,
				Status:      v.Status,
				Results:     v.Results,
			})

			fmt.Println("==send work===", v)

			if err != nil {
				fmt.Println("send work err", err, v)
				return
			}
		case <-sws.closec:
			return
		}
	}
}

// 监听客户端上报的流数据
func (sws *serverWatchStream) recvLoop(s *AgentServer) error {
	for {
		req, err := sws.gRPCStream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		fmt.Println(fmt.Sprintf("recive result --> work name: %s, uid: %s status is %s,", req.Name, req.Uuid, req.Status))

		switch req.Name {
		case define.WorkAgentCreateMsg, define.WorkAgentHeartbeatMsg:
			sws.watcher.appName = req.AppName
			sws.watcher.appUuid = req.AppUuid
			fmt.Println("---recv init action----", req.AppName, req.AppUuid)

			// 通知到agentServer消息通道
			s.DoWorkChan <- Work{
				Name:    req.Name,
				PriKey:  req.PriKey,
				AppName: req.AppName,
				Uuid:    req.Uuid,
				Params:  req.Params,
				Results: req.Results,
				Status:  req.Status,
			}

			// 注册到本地map变量
			err := s.addWatcher(sws.watcher)
			if err != nil {
				sws.watcher.ch <- pubproto.WorkDetail{
					PriKey:  req.PriKey,
					AppName: req.AppName,
					Name:    req.Name,
					Status:  define.StatusFailed,
					Results: err.Error(),
				}
				return AGENT_CREATE_ERR
			}
			if err == nil {
				go s.ConnectAfterDo(req)
			}
		case define.WorkAgentCancelMsg:
			s.cancelWatcher(req.Uuid)
		case define.WorkActivePushMsg:
			// 客户端主动回复消息
			//fmt.Println("---ACTIVE-PUSH--", req.Name, req.Uuid, req.Results, "--")
			// 通知到agentServer消息通道
			s.DoWorkChan <- Work{
				Name:    req.Name,
				PriKey:  req.PriKey,
				AppName: req.AppName,
				Uuid:    req.Uuid,
				Params:  req.Params,
				Results: req.Results,
				Status:  req.Status,
			}

		default:
			// 匹配到对应的事件更新执行结果
			// 这里有并发读写map风险需要有策略进行处理
			s.mutex.Lock()
			if work, ok := s.workList[req.Uuid]; ok {
				work.Status = req.Status
				work.Results = req.Results
				work.ResponseTime = time.Now().Unix()
			}
			s.mutex.Unlock()
		}
	}
}

func (sws *serverWatchStream) close() {
	close(sws.closec)
	sws.wg.Wait()
	fmt.Printf("watcher %s:%s is closed", sws.watcher.appName, sws.watcher.appUuid)
}
